/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import { Icon, Intent, Menu, MenuItem, Popover, Position, Tag } from '@blueprintjs/core';
import { IconNames } from '@blueprintjs/icons';
import * as JSONBig from 'json-bigint-native';
import memoize from 'memoize-one';
import type { JSX } from 'react';
import React, { createContext, useContext } from 'react';
import type { Column, SortingRule } from 'react-table';
import ReactTable from 'react-table';

import type { TableColumnSelectorColumn } from '../../components';
import {
  ACTION_COLUMN_ID,
  ACTION_COLUMN_LABEL,
  ACTION_COLUMN_WIDTH,
  ActionCell,
  MoreButton,
  RefreshButton,
  TableClickableCell,
  TableColumnSelector,
  TableFilterableCell,
  ViewControlBar,
} from '../../components';
import {
  AlertDialog,
  AsyncActionDialog,
  SpecDialog,
  SupervisorResetOffsetsDialog,
  SupervisorTableActionDialog,
  TaskGroupHandoffDialog,
} from '../../dialogs';
import type {
  ConsoleViewId,
  IngestionSpec,
  QueryWithContext,
  RowStatsKey,
  SupervisorStats,
  SupervisorStatus,
  SupervisorStatusTask,
} from '../../druid-models';
import { getConsoleViewIcon, getTotalSupervisorStats } from '../../druid-models';
import type { Capabilities } from '../../helpers';
import {
  SMALL_TABLE_PAGE_SIZE,
  SMALL_TABLE_PAGE_SIZE_OPTIONS,
  suggestibleFilterInput,
} from '../../react-table';
import { Api, AppToaster } from '../../singletons';
import type { AuxiliaryQueryFn, TableState } from '../../utils';
import {
  assemble,
  checkedCircleIcon,
  deepGet,
  filterMap,
  formatByteRate,
  formatBytes,
  formatInteger,
  formatRate,
  getApiArray,
  getDruidErrorMessage,
  hasOverlayOpen,
  isNumberLike,
  LocalStorageBackedVisibility,
  LocalStorageKeys,
  nonEmptyArray,
  oneOf,
  pluralIfNeeded,
  queryDruidSql,
  QueryManager,
  QueryState,
  ResultWithAuxiliaryWork,
  sortedToOrderByClause,
  twoLines,
} from '../../utils';
import type { BasicAction } from '../../utils/basic-action';
import { TableFilters } from '../../utils/table-filters';

import './supervisors-view.scss';

const SUPERVISOR_TABLE_COLUMNS: TableColumnSelectorColumn[] = [
  'Supervisor ID',
  'Datasource',
  'Type',
  'Topic/Stream',
  'Status',
  'Configured tasks',
  { text: 'Running tasks', label: 'status API' },
  { text: 'Aggregate lag', label: 'status API' },
  { text: 'Stats', label: 'stats API' },
  { text: 'Recent errors', label: 'status API' },
];

const ROW_STATS_KEYS: RowStatsKey[] = ['1m', '5m', '15m'];
const STATUS_API_TIMEOUT = 5000;
const STATS_API_TIMEOUT = 5000;

function getRowStatsKeyTitle(key: RowStatsKey) {
  return `Rate over past ${pluralIfNeeded(parseInt(key, 10), 'minute')}`;
}

function getRowStatsKeySeconds(key: RowStatsKey): number {
  return parseInt(key, 10) * 60;
}

interface SupervisorQuery extends TableState {
  capabilities: Capabilities;
  visibleColumns: LocalStorageBackedVisibility;
}

interface SupervisorQueryResultRow {
  readonly supervisor_id: string;
  readonly datasource: string;
  readonly type: string;
  readonly source: string;
  readonly detailed_state: string;
  readonly spec?: IngestionSpec;
  readonly suspended: boolean;
}

interface SupervisorsWithAuxiliaryInfo {
  readonly supervisors: SupervisorQueryResultRow[];
  readonly count: number;
  readonly status: Record<string, SupervisorStatus>;
  readonly stats: Record<string, SupervisorStats>;
}

export const StatusContext = createContext<Record<string, SupervisorStatus>>({});
export const StatsContext = createContext<{
  stats: Record<string, SupervisorStats>;
  statsKey: RowStatsKey;
}>({ stats: {}, statsKey: '5m' });

interface HeaderStatsKeySelectorProps {
  changeStatsKey(statsKey: RowStatsKey): void;
}

function HeaderStatsKeySelector({ changeStatsKey }: HeaderStatsKeySelectorProps) {
  const { statsKey } = useContext(StatsContext);
  return (
    <Popover
      position={Position.BOTTOM}
      content={
        <Menu>
          {ROW_STATS_KEYS.map(k => (
            <MenuItem
              key={k}
              icon={checkedCircleIcon(k === statsKey)}
              text={getRowStatsKeyTitle(k)}
              onClick={() => changeStatsKey(k)}
            />
          ))}
        </Menu>
      }
    >
      <i className="title-button">
        {getRowStatsKeyTitle(statsKey)} <Icon icon={IconNames.CARET_DOWN} />
      </i>
    </Popover>
  );
}

export interface SupervisorsViewProps {
  filters: TableFilters;
  onFiltersChange(filters: TableFilters): void;
  openSupervisorDialog: boolean | undefined;
  goToView(tab: ConsoleViewId, filters?: TableFilters): void;
  goToQuery(queryWithContext: QueryWithContext): void;
  goToStreamingDataLoader(supervisorId: string): void;
  capabilities: Capabilities;
}

export interface SupervisorsViewState {
  supervisorsState: QueryState<SupervisorsWithAuxiliaryInfo>;
  statsKey: RowStatsKey;

  resumeSupervisorId?: string;
  suspendSupervisorId?: string;
  handoffSupervisorId?: string;
  resetOffsetsSupervisorInfo?: { id: string; type: string };
  resetSupervisorId?: string;
  terminateSupervisorId?: string;

  showResumeAllSupervisors: boolean;
  showSuspendAllSupervisors: boolean;
  showTerminateAllSupervisors: boolean;

  supervisorSpecDialogOpen: boolean;
  alertErrorMsg?: string;

  supervisorTableActionDialogId?: string;
  supervisorTableActionDialogActions: BasicAction[];

  visibleColumns: LocalStorageBackedVisibility;
  page: number;
  pageSize: number;
  sorted: SortingRule[];
}

function detailedStateToColor(detailedState: string): string {
  switch (detailedState) {
    case 'UNHEALTHY_SUPERVISOR':
    case 'UNHEALTHY_TASKS':
    case 'UNABLE_TO_CONNECT_TO_STREAM':
    case 'LOST_CONTACT_WITH_STREAM':
    case 'INVALID_SPEC':
      return '#d5100a';

    case 'PENDING':
      return '#00eaff';

    case 'DISCOVERING_INITIAL_TASKS':
    case 'CREATING_TASKS':
    case 'CONNECTING_TO_STREAM':
    case 'RUNNING':
      return '#2167d5';

    case 'IDLE':
      return '#44659d';

    case 'STOPPING':
      return '#e75c06';

    case 'SUSPENDED':
    case 'SCHEDULER_STOPPED':
      return '#ffbf00';

    default:
      return '#0a1500';
  }
}

export class SupervisorsView extends React.PureComponent<
  SupervisorsViewProps,
  SupervisorsViewState
> {
  private readonly supervisorQueryManager: QueryManager<
    SupervisorQuery,
    SupervisorsWithAuxiliaryInfo
  >;

  constructor(props: SupervisorsViewProps) {
    super(props);

    this.state = {
      supervisorsState: QueryState.INIT,
      statsKey: '5m',

      showResumeAllSupervisors: false,
      showSuspendAllSupervisors: false,
      showTerminateAllSupervisors: false,

      supervisorSpecDialogOpen: Boolean(props.openSupervisorDialog),

      supervisorTableActionDialogActions: [],

      visibleColumns: new LocalStorageBackedVisibility(
        LocalStorageKeys.SUPERVISOR_TABLE_COLUMN_SELECTION,
      ),
      page: 0,
      pageSize: SMALL_TABLE_PAGE_SIZE,
      sorted: [],
    };

    this.supervisorQueryManager = new QueryManager({
      processQuery: async (
        { capabilities, visibleColumns, filtered, sorted, page, pageSize },
        signal,
        setIntermediateQuery,
      ) => {
        let supervisors: SupervisorQueryResultRow[];
        let count = -1;
        const auxiliaryQueries: AuxiliaryQueryFn<SupervisorsWithAuxiliaryInfo>[] = [];
        if (capabilities.hasSql()) {
          const whereExpression = filtered.toSqlExpression();

          let filterClause = '';
          if (whereExpression.toString() !== 'TRUE') {
            filterClause = whereExpression.toString();
          }

          const sqlQuery = assemble(
            'WITH s AS (SELECT',
            '  "supervisor_id",',
            '  "datasource",',
            '  "type",',
            '  "source",',
            `  CASE WHEN "suspended" = 0 THEN "detailed_state" ELSE 'SUSPENDED' END AS "detailed_state",`,
            visibleColumns.shown('Configured tasks') ? '  "spec",' : undefined,
            '  "suspended" = 1 AS "suspended"',
            'FROM "sys"."supervisors")',
            'SELECT *',
            'FROM s',
            filterClause ? `WHERE ${filterClause}` : undefined,
            sortedToOrderByClause(sorted),
            `LIMIT ${pageSize}`,
            page ? `OFFSET ${page * pageSize}` : undefined,
          ).join('\n');
          setIntermediateQuery(sqlQuery);
          supervisors = (
            await queryDruidSql<SupervisorQueryResultRow>(
              {
                query: sqlQuery,
              },
              signal,
            )
          ).map(supervisor => {
            const spec: any = supervisor.spec;
            if (typeof spec !== 'string') return supervisor;
            return { ...supervisor, spec: JSONBig.parse(spec) };
          });

          auxiliaryQueries.push(async (supervisorsWithAuxiliaryInfo, signal) => {
            const sqlQuery = assemble(
              'SELECT COUNT(*) AS "cnt"',
              'FROM "sys"."supervisors"',
              filterClause ? `WHERE ${filterClause}` : undefined,
            ).join('\n');
            const cnt: any = (
              await queryDruidSql<{ cnt: number }>(
                {
                  query: sqlQuery,
                },
                signal,
              )
            )[0].cnt;
            return {
              ...supervisorsWithAuxiliaryInfo,
              count: typeof cnt === 'number' ? cnt : -1,
            };
          });
        } else if (capabilities.hasOverlordAccess()) {
          supervisors = (await getApiArray('/druid/indexer/v1/supervisor?full', signal)).map(
            (sup: any) => {
              return {
                supervisor_id: deepGet(sup, 'id'),
                datasource: deepGet(sup, 'dataSource'),
                type: deepGet(sup, 'spec.tuningConfig.type'),
                source:
                  deepGet(sup, 'spec.ioConfig.topic') ||
                  deepGet(sup, 'spec.ioConfig.stream') ||
                  'n/a',
                state: deepGet(sup, 'state'),
                detailed_state: deepGet(sup, 'detailedState'),
                spec: sup.spec,
                suspended: Boolean(deepGet(sup, 'suspended')),
              };
            },
          );
          count = supervisors.length;

          const firstSorted = sorted[0];
          if (firstSorted) {
            const { id, desc } = firstSorted;
            supervisors.sort((s1: any, s2: any) => {
              return (
                String(s1[id]).localeCompare(String(s2[id]), undefined, { numeric: true }) *
                (desc ? -1 : 1)
              );
            });
          }
        } else {
          throw new Error(`must have SQL or overlord access`);
        }

        if (capabilities.hasOverlordAccess()) {
          if (visibleColumns.shown('Running tasks', 'Aggregate lag', 'Recent errors')) {
            auxiliaryQueries.push(
              ...supervisors.map(
                (supervisor): AuxiliaryQueryFn<SupervisorsWithAuxiliaryInfo> =>
                  async (supervisorsWithAuxiliaryInfo, signal) => {
                    const status = (
                      await Api.instance.get(
                        `/druid/indexer/v1/supervisor/${Api.encodePath(
                          supervisor.supervisor_id,
                        )}/status`,
                        { signal, timeout: STATUS_API_TIMEOUT },
                      )
                    ).data;
                    return {
                      ...supervisorsWithAuxiliaryInfo,
                      status: {
                        ...supervisorsWithAuxiliaryInfo.status,
                        [supervisor.supervisor_id]: status,
                      },
                    };
                  },
              ),
            );
          }

          if (visibleColumns.shown('Stats')) {
            auxiliaryQueries.push(
              ...filterMap(
                supervisors,
                (supervisor): AuxiliaryQueryFn<SupervisorsWithAuxiliaryInfo> | undefined => {
                  if (oneOf(supervisor.type, 'autocompact', 'scheduled_batch')) return; // These supervisors do not report stats
                  return async (supervisorsWithAuxiliaryInfo, signal) => {
                    const stats = (
                      await Api.instance.get(
                        `/druid/indexer/v1/supervisor/${Api.encodePath(
                          supervisor.supervisor_id,
                        )}/stats`,
                        { signal, timeout: STATS_API_TIMEOUT },
                      )
                    ).data;
                    return {
                      ...supervisorsWithAuxiliaryInfo,
                      stats: {
                        ...supervisorsWithAuxiliaryInfo.stats,
                        [supervisor.supervisor_id]: stats,
                      },
                    };
                  };
                },
              ),
            );
          }
        }

        return new ResultWithAuxiliaryWork<SupervisorsWithAuxiliaryInfo>(
          { supervisors, count, status: {}, stats: {} },
          auxiliaryQueries,
        );
      },
      onStateChange: supervisorsState => {
        this.setState({
          supervisorsState,
        });
      },
    });
  }

  componentDidMount() {
    this.fetchData();
  }

  componentWillUnmount(): void {
    this.supervisorQueryManager.terminate();
  }

  componentDidUpdate(
    prevProps: Readonly<SupervisorsViewProps>,
    prevState: Readonly<SupervisorsViewState>,
  ) {
    const { filters } = this.props;
    const { page, pageSize, sorted } = this.state;
    if (
      !filters.toSqlExpression().equals(prevProps.filters.toSqlExpression()) ||
      page !== prevState.page ||
      pageSize !== prevState.pageSize ||
      sortedToOrderByClause(sorted) !== sortedToOrderByClause(prevState.sorted)
    ) {
      this.fetchData();
    }
  }

  private readonly fetchData = () => {
    const { capabilities, filters } = this.props;
    const { visibleColumns, page, pageSize, sorted } = this.state;
    this.supervisorQueryManager.runQuery({
      page,
      pageSize,
      filtered: filters,
      sorted,
      visibleColumns,
      capabilities,
    });
  };

  private readonly handleFilterChange = (filters: TableFilters) => {
    this.goToFirstPage();
    this.props.onFiltersChange(filters);
  };

  private goToFirstPage() {
    if (this.state.page) {
      this.setState({ page: 0 });
    }
  }

  private readonly closeSpecDialogs = () => {
    this.setState({
      supervisorSpecDialogOpen: false,
    });
  };

  private readonly submitSupervisor = async (spec: JSON) => {
    try {
      await Api.instance.post('/druid/indexer/v1/supervisor', spec);
    } catch (e) {
      AppToaster.show({
        message: `Failed to submit supervisor: ${getDruidErrorMessage(e)}`,
        intent: Intent.DANGER,
      });
      return;
    }

    AppToaster.show({
      message: 'Supervisor submitted successfully',
      intent: Intent.SUCCESS,
    });
    this.supervisorQueryManager.rerunLastQuery();
  };

  private getSupervisorActions(supervisor: SupervisorQueryResultRow): BasicAction[] {
    const { supervisor_id, datasource, suspended, type } = supervisor;
    const { goToView, goToStreamingDataLoader } = this.props;

    const actions: BasicAction[] = [];
    if (oneOf(type, 'kafka', 'kinesis')) {
      actions.push(
        {
          icon: IconNames.MULTI_SELECT,
          title: 'Go to datasource',
          onAction: () => goToView('datasources', TableFilters.eq({ datasource })),
        },
        {
          icon: IconNames.CLOUD_UPLOAD,
          title: 'Open in data loader',
          onAction: () => goToStreamingDataLoader(supervisor_id),
        },
      );
    }

    actions.push(
      {
        icon: suspended ? IconNames.PLAY : IconNames.PAUSE,
        title: suspended ? 'Resume' : 'Suspend',
        onAction: () =>
          suspended
            ? this.setState({ resumeSupervisorId: supervisor_id })
            : this.setState({ suspendSupervisorId: supervisor_id }),
      },
      {
        icon: getConsoleViewIcon('tasks'),
        title: 'Go to tasks',
        onAction: () => this.goToTasksForSupervisor(supervisor),
      },
    );

    if (!suspended) {
      actions.push({
        icon: IconNames.AUTOMATIC_UPDATES,
        title: 'Handoff early',
        onAction: () => this.setState({ handoffSupervisorId: supervisor_id }),
      });
    }

    actions.push(
      {
        icon: IconNames.STEP_BACKWARD,
        title: `Set ${type === 'kinesis' ? 'sequence numbers' : 'offsets'}`,
        onAction: () => this.setState({ resetOffsetsSupervisorInfo: { id: supervisor_id, type } }),
      },
      {
        icon: IconNames.STEP_BACKWARD,
        title: 'Hard reset',
        intent: Intent.DANGER,
        onAction: () => this.setState({ resetSupervisorId: supervisor_id }),
      },
      {
        icon: IconNames.CROSS,
        title: 'Terminate',
        intent: Intent.DANGER,
        onAction: () => this.setState({ terminateSupervisorId: supervisor_id }),
      },
    );
    return actions;
  }

  renderResumeSupervisorAction() {
    const { resumeSupervisorId } = this.state;
    if (!resumeSupervisorId) return;

    return (
      <AsyncActionDialog
        action={async () => {
          const resp = await Api.instance.post(
            `/druid/indexer/v1/supervisor/${Api.encodePath(resumeSupervisorId)}/resume`,
            {},
          );
          return resp.data;
        }}
        confirmButtonText="Resume supervisor"
        successText="Supervisor has been resumed"
        failText="Could not resume supervisor"
        intent={Intent.PRIMARY}
        onClose={() => {
          this.setState({ resumeSupervisorId: undefined });
        }}
        onSuccess={() => {
          this.supervisorQueryManager.rerunLastQuery();
        }}
      >
        <p>
          Are you sure you want to resume supervisor <Tag minimal>{resumeSupervisorId}</Tag>?
        </p>
      </AsyncActionDialog>
    );
  }

  renderSuspendSupervisorAction() {
    const { suspendSupervisorId } = this.state;
    if (!suspendSupervisorId) return;

    return (
      <AsyncActionDialog
        action={async () => {
          const resp = await Api.instance.post(
            `/druid/indexer/v1/supervisor/${Api.encodePath(suspendSupervisorId)}/suspend`,
            {},
          );
          return resp.data;
        }}
        confirmButtonText="Suspend supervisor"
        successText="Supervisor has been suspended"
        failText="Could not suspend supervisor"
        intent={Intent.DANGER}
        onClose={() => {
          this.setState({ suspendSupervisorId: undefined });
        }}
        onSuccess={() => {
          this.supervisorQueryManager.rerunLastQuery();
        }}
      >
        <p>
          Are you sure you want to suspend supervisor <Tag minimal>{suspendSupervisorId}</Tag>?
        </p>
      </AsyncActionDialog>
    );
  }

  renderTaskGroupHandoffAction() {
    const { handoffSupervisorId } = this.state;
    if (!handoffSupervisorId) return;

    return (
      <TaskGroupHandoffDialog
        supervisorId={handoffSupervisorId}
        onClose={() => {
          this.setState({ handoffSupervisorId: undefined });
        }}
        onSuccess={() => {
          this.supervisorQueryManager.rerunLastQuery();
        }}
      />
    );
  }

  renderResetOffsetsSupervisorAction() {
    const { resetOffsetsSupervisorInfo } = this.state;
    if (!resetOffsetsSupervisorInfo) return;

    return (
      <SupervisorResetOffsetsDialog
        supervisorId={resetOffsetsSupervisorInfo.id}
        supervisorType={resetOffsetsSupervisorInfo.type}
        onClose={() => {
          this.setState({ resetOffsetsSupervisorInfo: undefined });
        }}
      />
    );
  }

  renderResetSupervisorAction() {
    const { resetSupervisorId } = this.state;
    if (!resetSupervisorId) return;

    return (
      <AsyncActionDialog
        action={async () => {
          const resp = await Api.instance.post(
            `/druid/indexer/v1/supervisor/${Api.encodePath(resetSupervisorId)}/reset`,
            '',
          );
          return resp.data;
        }}
        confirmButtonText="Hard reset supervisor"
        successText="Supervisor has been hard reset"
        failText="Could not hard reset supervisor"
        intent={Intent.DANGER}
        onClose={() => {
          this.setState({ resetSupervisorId: undefined });
        }}
        onSuccess={() => {
          this.supervisorQueryManager.rerunLastQuery();
        }}
        warningChecks={[
          <>
            I understand that resetting <Tag minimal>{resetSupervisorId}</Tag> will clear
            checkpoints and may lead to data loss or duplication.
          </>,
          'I understand that this operation cannot be undone.',
        ]}
      >
        <p>
          Are you sure you want to hard reset supervisor <Tag minimal>{resetSupervisorId}</Tag>?
        </p>
        <p>Hard resetting a supervisor may lead to data loss or data duplication.</p>
        <p>
          Use this operation to restore functionality when the supervisor stops operating due to
          missing offsets.
        </p>
      </AsyncActionDialog>
    );
  }

  renderTerminateSupervisorAction() {
    const { terminateSupervisorId } = this.state;
    if (!terminateSupervisorId) return;

    return (
      <AsyncActionDialog
        action={async () => {
          const resp = await Api.instance.post(
            `/druid/indexer/v1/supervisor/${Api.encodePath(terminateSupervisorId)}/terminate`,
            {},
          );
          return resp.data;
        }}
        confirmButtonText="Terminate supervisor"
        successText="Supervisor has been terminated"
        failText="Could not terminate supervisor"
        intent={Intent.DANGER}
        onClose={() => {
          this.setState({ terminateSupervisorId: undefined });
        }}
        onSuccess={() => {
          this.supervisorQueryManager.rerunLastQuery();
        }}
      >
        <p>
          Are you sure you want to terminate supervisor <Tag minimal>{terminateSupervisorId}</Tag>?
        </p>
        <p>This action is not reversible.</p>
      </AsyncActionDialog>
    );
  }

  private renderSupervisorFilterableCell(field: string) {
    const { filters } = this.props;
    const { handleFilterChange } = this;

    return function SupervisorFilterableCell(row: { value: any }) {
      return (
        <TableFilterableCell
          field={field}
          value={row.value}
          filters={filters}
          onFiltersChange={handleFilterChange}
        />
      );
    };
  }

  private onSupervisorDetail(supervisor: SupervisorQueryResultRow) {
    this.setState({
      supervisorTableActionDialogId: supervisor.supervisor_id,
      supervisorTableActionDialogActions: this.getSupervisorActions(supervisor),
    });
  }

  private goToTasksForSupervisor(supervisor: SupervisorQueryResultRow) {
    const { goToView } = this.props;
    switch (supervisor.type) {
      case 'kafka':
      case 'kinesis':
        goToView(
          'tasks',
          TableFilters.eq({
            group_id: `index_${supervisor.type}_${supervisor.supervisor_id}`,
            type: `index_${supervisor.type}`,
          }),
        );
        return;

      case 'autocompact':
        goToView(
          'tasks',
          TableFilters.eq({
            datasource: supervisor.supervisor_id.replace(/^autocompact__/, ''),
            type: 'compact',
          }),
        );
        return;

      case 'scheduled_batch':
        goToView(
          'tasks',
          TableFilters.eq({
            datasource: supervisor.supervisor_id
              .replace(/^scheduled_batch__/, '')
              .replace(/__[0-9a-f-]+$/, ''),
            type: 'query_controller',
          }),
        );
        return;

      default:
        goToView('tasks', TableFilters.eq({ group_id: supervisor.supervisor_id }));
        return;
    }
  }

  private renderSupervisorTable() {
    const { filters } = this.props;
    const { supervisorsState, page, pageSize, sorted, statsKey, visibleColumns } = this.state;

    const { supervisors, count, status, stats } = supervisorsState.data || {
      supervisors: [],
      count: -1,
      status: {},
      stats: {},
    };
    return (
      <StatusContext.Provider value={status}>
        <StatsContext.Provider value={{ stats, statsKey }}>
          <ReactTable
            data={supervisors}
            pages={count >= 0 ? Math.ceil(count / pageSize) : 10000000} // We are hiding the page selector
            loading={supervisorsState.loading}
            noDataText={
              supervisorsState.isEmpty()
                ? 'No supervisors'
                : supervisorsState.getErrorMessage() || ''
            }
            manual
            filterable
            filtered={filters.toFilters()}
            onFilteredChange={filters => this.handleFilterChange(TableFilters.fromFilters(filters))}
            sorted={sorted}
            onSortedChange={sorted => this.setState({ sorted })}
            page={page}
            onPageChange={page => this.setState({ page })}
            pageSize={pageSize}
            onPageSizeChange={pageSize => this.setState({ pageSize })}
            pageSizeOptions={SMALL_TABLE_PAGE_SIZE_OPTIONS}
            showPagination
            showPageJump={false}
            ofText={count >= 0 ? `of ${formatInteger(count)}` : ''}
            columns={this.getTableColumns(visibleColumns, filters)}
          />
        </StatsContext.Provider>
      </StatusContext.Provider>
    );
  }

  private readonly getTableColumns = memoize(
    (
      visibleColumns: LocalStorageBackedVisibility,
      filters: TableFilters,
    ): Column<SupervisorQueryResultRow>[] => {
      return [
        {
          Header: 'Supervisor ID',
          id: 'supervisor_id',
          accessor: 'supervisor_id',
          width: 280,
          show: visibleColumns.shown('Supervisor ID'),
          Cell: ({ value, original }) => (
            <TableClickableCell
              tooltip="Show detail"
              onClick={() => this.onSupervisorDetail(original)}
              hoverIcon={IconNames.SEARCH_TEMPLATE}
            >
              {value}
            </TableClickableCell>
          ),
        },
        {
          Header: 'Datasource',
          accessor: 'datasource',
          width: 280,
          Cell: this.renderSupervisorFilterableCell('datasource'),
          show: visibleColumns.shown('Datasource'),
        },
        {
          Header: 'Type',
          accessor: 'type',
          width: 120,
          Cell: this.renderSupervisorFilterableCell('type'),
          show: visibleColumns.shown('Type'),
        },
        {
          Header: 'Topic/Stream',
          accessor: 'source',
          width: 200,
          Cell: this.renderSupervisorFilterableCell('source'),
          show: visibleColumns.shown('Topic/Stream'),
        },
        {
          Header: 'Status',
          id: 'detailed_state',
          width: 170,
          Filter: suggestibleFilterInput([
            'CONNECTING_TO_STREAM',
            'CREATING_TASKS',
            'DISCOVERING_INITIAL_TASKS',
            'IDLE',
            'INVALID_SPEC',
            'LOST_CONTACT_WITH_STREAM',
            'PENDING',
            'RUNNING',
            'SCHEDULER_STOPPED',
            'STOPPING',
            'SUSPENDED',
            'UNABLE_TO_CONNECT_TO_STREAM',
            'UNHEALTHY_SUPERVISOR',
            'UNHEALTHY_TASKS',
          ]),
          accessor: 'detailed_state',
          Cell: ({ value }) => (
            <TableFilterableCell
              field="detailed_state"
              value={value}
              filters={filters}
              onFiltersChange={this.handleFilterChange}
            >
              <span>
                <span style={{ color: detailedStateToColor(value) }}>&#x25cf;&nbsp;</span>
                {value}
              </span>
            </TableFilterableCell>
          ),
          show: visibleColumns.shown('Status'),
        },
        {
          Header: 'Configured tasks',
          id: 'configured_tasks',
          width: 130,
          accessor: 'spec',
          filterable: false,
          sortable: false,
          className: 'padded',
          Cell: ({ value }) => {
            if (!value) return null;
            const taskCount = deepGet(value, 'spec.ioConfig.taskCount');
            const replicas = deepGet(value, 'spec.ioConfig.replicas');
            if (typeof taskCount !== 'number' || typeof replicas !== 'number') return null;
            return (
              <div>
                <div>{formatInteger(taskCount * replicas)}</div>
                <div className="detail-line">
                  {replicas === 1
                    ? '(no replication)'
                    : `(${pluralIfNeeded(taskCount, 'task')} × ${pluralIfNeeded(
                        replicas,
                        'replica',
                      )})`}
                </div>
              </div>
            );
          },
          show: visibleColumns.shown('Configured tasks'),
        },
        {
          Header: 'Running tasks',
          id: 'running_tasks',
          accessor: 'supervisor_id',
          width: 150,
          filterable: false,
          sortable: false,
          Cell: ({ value, original }) => {
            const status = useContext(StatusContext);
            if (original.suspended) return;
            const { activeTasks, publishingTasks } = status[value]?.payload || {};
            let label: string | JSX.Element;
            if (Array.isArray(activeTasks)) {
              label = pluralIfNeeded(activeTasks.length, 'active task');
              if (nonEmptyArray(publishingTasks)) {
                label = (
                  <>
                    <div>{label}</div>
                    <div>{pluralIfNeeded(publishingTasks.length, 'publishing task')}</div>
                  </>
                );
              }
            } else {
              label = '';
            }

            return (
              <TableClickableCell
                tooltip="Go to tasks"
                onClick={() => this.goToTasksForSupervisor(original)}
                hoverIcon={IconNames.ARROW_TOP_RIGHT}
              >
                {label}
              </TableClickableCell>
            );
          },
          show: visibleColumns.shown('Running tasks'),
        },
        {
          Header: 'Aggregate lag',
          accessor: 'supervisor_id',
          width: 200,
          filterable: false,
          sortable: false,
          className: 'padded',
          show: visibleColumns.shown('Aggregate lag'),
          Cell: ({ value }) => {
            const status = useContext(StatusContext);
            const aggregateLag = status[value]?.payload?.aggregateLag;
            return isNumberLike(aggregateLag) ? formatInteger(aggregateLag) : null;
          },
        },
        {
          Header: twoLines(
            'Stats',
            <HeaderStatsKeySelector changeStatsKey={statsKey => this.setState({ statsKey })} />,
          ),
          id: 'stats',
          accessor: 'supervisor_id',
          width: 300,
          filterable: false,
          sortable: false,
          className: 'padded',
          show: visibleColumns.shown('Stats'),
          Cell: ({ value, original }) => {
            const { stats, statsKey } = useContext(StatsContext);
            const s = stats[value];
            if (!s) return;
            const activeTasks: SupervisorStatusTask[] | undefined = deepGet(
              original,
              'status.payload.activeTasks',
            );
            const activeTaskIds: string[] | undefined = Array.isArray(activeTasks)
              ? activeTasks.map((t: SupervisorStatusTask) => t.id)
              : undefined;

            const c = getTotalSupervisorStats(s, statsKey, activeTaskIds);
            const seconds = getRowStatsKeySeconds(statsKey);
            const issues = (c.processedWithError || 0) + (c.thrownAway || 0) + (c.unparseable || 0);
            const totalLabel = `Total (past ${statsKey}): `;
            return issues ? (
              <div>
                <div
                  data-tooltip={`${totalLabel}${formatBytes(c.processedBytes * seconds)}`}
                >{`Input: ${formatByteRate(c.processedBytes)}`}</div>
                {Boolean(c.processed) && (
                  <div
                    data-tooltip={`${totalLabel}${formatInteger(c.processed * seconds)} events`}
                  >{`Processed: ${formatRate(c.processed)}`}</div>
                )}
                {Boolean(c.processedWithError) && (
                  <div
                    className="warning-line"
                    data-tooltip={`${totalLabel}${formatInteger(
                      c.processedWithError * seconds,
                    )} events`}
                  >
                    Processed with error: {formatRate(c.processedWithError)}
                  </div>
                )}
                {Boolean(c.thrownAway) && (
                  <div
                    className="warning-line"
                    data-tooltip={`${totalLabel}${formatInteger(c.thrownAway * seconds)} events`}
                  >
                    Thrown away: {formatRate(c.thrownAway)}
                  </div>
                )}
                {Boolean(c.unparseable) && (
                  <div
                    className="warning-line"
                    data-tooltip={`${totalLabel}${formatInteger(c.unparseable * seconds)} events`}
                  >
                    Unparseable: {formatRate(c.unparseable)}
                  </div>
                )}
              </div>
            ) : c.processedBytes ? (
              <div
                data-tooltip={`${totalLabel}${formatInteger(
                  c.processed * seconds,
                )} events, ${formatBytes(c.processedBytes * seconds)}`}
              >{`Processed: ${formatRate(c.processed)} (${formatByteRate(c.processedBytes)})`}</div>
            ) : (
              <div>No activity</div>
            );
          },
        },
        {
          Header: 'Recent errors',
          accessor: 'supervisor_id',
          width: 150,
          filterable: false,
          sortable: false,
          show: visibleColumns.shown('Recent errors'),
          Cell: ({ value, original }) => {
            const status = useContext(StatusContext);
            const recentErrors = status[value]?.payload?.recentErrors;
            if (!recentErrors) return null;
            return (
              <TableClickableCell
                tooltip="Show errors"
                onClick={() => this.onSupervisorDetail(original)}
                hoverIcon={IconNames.SEARCH_TEMPLATE}
              >
                {pluralIfNeeded(recentErrors.length, 'error')}
              </TableClickableCell>
            );
          },
        },
        {
          Header: ACTION_COLUMN_LABEL,
          id: ACTION_COLUMN_ID,
          accessor: 'supervisor_id',
          width: ACTION_COLUMN_WIDTH,
          filterable: false,
          sortable: false,
          Cell: ({ value: id, original }) => {
            const supervisorActions = this.getSupervisorActions(original);
            return (
              <ActionCell
                onDetail={() => this.onSupervisorDetail(original)}
                actions={supervisorActions}
                menuTitle={id}
              />
            );
          },
        },
      ];
    },
  );

  renderBulkSupervisorActions() {
    const { capabilities, goToQuery } = this.props;
    const lastSupervisorQuery = this.supervisorQueryManager.getLastIntermediateQuery();

    return (
      <>
        <MoreButton>
          {capabilities.hasSql() && (
            <MenuItem
              icon={getConsoleViewIcon('workbench')}
              text="View SQL query for table"
              disabled={typeof lastSupervisorQuery !== 'string'}
              onClick={() => {
                if (typeof lastSupervisorQuery !== 'string') return;
                goToQuery({ queryString: lastSupervisorQuery });
              }}
            />
          )}
          <MenuItem
            icon={IconNames.MANUALLY_ENTERED_DATA}
            text="Submit JSON supervisor"
            onClick={() => this.setState({ supervisorSpecDialogOpen: true })}
          />
          <MenuItem
            icon={IconNames.PLAY}
            text="Resume all supervisors"
            onClick={() => this.setState({ showResumeAllSupervisors: true })}
          />
          <MenuItem
            icon={IconNames.PAUSE}
            text="Suspend all supervisors"
            onClick={() => this.setState({ showSuspendAllSupervisors: true })}
          />
          <MenuItem
            icon={IconNames.CROSS}
            text="Terminate all supervisors"
            intent={Intent.DANGER}
            onClick={() => this.setState({ showTerminateAllSupervisors: true })}
          />
        </MoreButton>
        {this.renderResumeAllSupervisorAction()}
        {this.renderSuspendAllSupervisorAction()}
        {this.renderTerminateAllSupervisorAction()}
      </>
    );
  }

  renderResumeAllSupervisorAction() {
    const { showResumeAllSupervisors } = this.state;
    if (!showResumeAllSupervisors) return;

    return (
      <AsyncActionDialog
        action={async () => {
          const resp = await Api.instance.post(`/druid/indexer/v1/supervisor/resumeAll`, {});
          return resp.data;
        }}
        confirmButtonText="Resume all supervisors"
        successText="All supervisors have been resumed"
        failText="Could not resume all supervisors"
        intent={Intent.PRIMARY}
        onClose={() => {
          this.setState({ showResumeAllSupervisors: false });
        }}
        onSuccess={() => {
          this.supervisorQueryManager.rerunLastQuery();
        }}
      >
        <p>Are you sure you want to resume all the supervisors?</p>
      </AsyncActionDialog>
    );
  }

  renderSuspendAllSupervisorAction() {
    const { showSuspendAllSupervisors } = this.state;
    if (!showSuspendAllSupervisors) return;

    return (
      <AsyncActionDialog
        action={async () => {
          const resp = await Api.instance.post(`/druid/indexer/v1/supervisor/suspendAll`, {});
          return resp.data;
        }}
        confirmButtonText="Suspend all supervisors"
        successText="All supervisors have been suspended"
        failText="Could not suspend all supervisors"
        intent={Intent.DANGER}
        onClose={() => {
          this.setState({ showSuspendAllSupervisors: false });
        }}
        onSuccess={() => {
          this.supervisorQueryManager.rerunLastQuery();
        }}
      >
        <p>Are you sure you want to suspend all the supervisors?</p>
      </AsyncActionDialog>
    );
  }

  renderTerminateAllSupervisorAction() {
    const { showTerminateAllSupervisors } = this.state;
    if (!showTerminateAllSupervisors) return;

    return (
      <AsyncActionDialog
        action={async () => {
          const resp = await Api.instance.post(`/druid/indexer/v1/supervisor/terminateAll`, {});
          return resp.data;
        }}
        confirmButtonText="Terminate all supervisors"
        successText="All supervisors have been terminated"
        failText="Could not terminate all supervisors"
        intent={Intent.DANGER}
        onClose={() => {
          this.setState({ showTerminateAllSupervisors: false });
        }}
        onSuccess={() => {
          this.supervisorQueryManager.rerunLastQuery();
        }}
      >
        <p>Are you sure you want to terminate all the supervisors?</p>
      </AsyncActionDialog>
    );
  }

  render() {
    const {
      supervisorSpecDialogOpen,
      alertErrorMsg,
      supervisorTableActionDialogId,
      supervisorTableActionDialogActions,
      visibleColumns,
    } = this.state;

    return (
      <div className="supervisors-view app-view">
        <ViewControlBar label="Supervisors">
          <RefreshButton
            localStorageKey={LocalStorageKeys.SUPERVISORS_REFRESH_RATE}
            onRefresh={auto => {
              if (auto && hasOverlayOpen()) return;
              this.supervisorQueryManager.rerunLastQuery(auto);
            }}
          />
          {this.renderBulkSupervisorActions()}
          <TableColumnSelector
            columns={SUPERVISOR_TABLE_COLUMNS}
            onChange={column =>
              this.setState(prevState => ({
                visibleColumns: prevState.visibleColumns.toggle(column),
              }))
            }
            onClose={this.fetchData}
            tableColumnsHidden={visibleColumns.getHiddenColumns()}
          />
        </ViewControlBar>
        {this.renderSupervisorTable()}
        {this.renderResumeSupervisorAction()}
        {this.renderSuspendSupervisorAction()}
        {this.renderTaskGroupHandoffAction()}
        {this.renderResetOffsetsSupervisorAction()}
        {this.renderResetSupervisorAction()}
        {this.renderTerminateSupervisorAction()}
        {supervisorSpecDialogOpen && (
          <SpecDialog
            onClose={this.closeSpecDialogs}
            onSubmit={this.submitSupervisor}
            title="Submit supervisor"
          />
        )}
        <AlertDialog
          icon={IconNames.ERROR}
          intent={Intent.PRIMARY}
          isOpen={Boolean(alertErrorMsg)}
          confirmButtonText="OK"
          onConfirm={() => this.setState({ alertErrorMsg: undefined })}
        >
          <p>{alertErrorMsg}</p>
        </AlertDialog>
        {supervisorTableActionDialogId && (
          <SupervisorTableActionDialog
            supervisorId={supervisorTableActionDialogId}
            actions={supervisorTableActionDialogActions}
            onClose={() => this.setState({ supervisorTableActionDialogId: undefined })}
          />
        )}
      </div>
    );
  }
}
